Skip to content

perf(store): save metadata async#3298

Merged
julienrbrt merged 9 commits intomainfrom
julien/async-metadata
Apr 29, 2026
Merged

perf(store): save metadata async#3298
julienrbrt merged 9 commits intomainfrom
julien/async-metadata

Conversation

@julienrbrt
Copy link
Copy Markdown
Member

@julienrbrt julienrbrt commented Apr 28, 2026

Overview

Improve store by letting SetMetadata happen async (used in submitter and cache).
Metadata are still crucial, as we save the last included height, some cache backup and pruning info.
However, we can queue the operation when a components is limited by the heavy writes (fast submitter competes with executor for db locks)

pkill -9 could leave the metadata in a weird state, but the system should autorecover with just a little extra work (double submission -- fine as syncer can handle it, pruning unexisting blocks -- fine as pruner can handle it)


AI summary:

Here's a summary of the change:
Problem: All block/ components share the same Badger-backed store. When the submitter or pruner writes metadata (SetMetadata/DeleteMetadata), they block Badger's write pipeline, slowing down the executor/syncer's critical batch commits (SaveBlockData + SetHeight + UpdateState + Commit).
Solution: Async metadata writes in CachedStore (pkg/store/cached_store.go):
- SetMetadata and DeleteMetadata now enqueue operations to a buffered channel (8192 slots) instead of writing synchronously
- A background goroutine (auto-started in NewCachedStore) drains the channel and persists writes to the underlying store
- If the buffer is full, writes fall back to synchronous (backpressure)
- Close() drains all pending writes before closing the underlying store
- All other operations (reads, batches, pruning, rollback) remain synchronous pass-throughs
Why this works:
- Executor/Syncer use NewBatch/Commit for block writes — these remain synchronous and are never blocked by metadata writes
- Submitter's processDAInclusionLoop does 3-4 SetMetadata calls per height (DA inclusion height, height-to-DA mappings) — now async, non-blocking
- Cache's setLastSubmittedHeight writes after every DA submission — now async
- Pruner's metadata writes — now async. Heavy operations (PruneBlocks, DeleteStateAtHeight) remain sync
- No changes to components.go — CachedStore is already the wrapper used everywhere

Summary by CodeRabbit

  • New Features

    • Asynchronous metadata writes for faster, non-blocking updates
    • Atomic batch metadata updates for consistent bulk changes
    • Tracing for improved observability of metadata operations
  • Tests

    • New tests validating async metadata propagation, coalescing semantics, shutdown/flush behavior, and post-close errors
  • Chore

    • Changelog updated to document the async metadata optimization

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 28, 2026

📝 Walkthrough

Walkthrough

CachedStore adds an asynchronous metadata writer: a buffered channel and background goroutine that batches, deduplicates, and persists metadata puts/deletes. New SetMetadata/DeleteMetadata enqueue ops (with sync fallback when stopped). Close() stops enqueueing, drains the writer, clears caches, and closes the underlying store.

Changes

Cohort / File(s) Summary
Cached store async logic
pkg/store/cached_store.go
Adds buffered writeCh, startWriteLoop writer goroutine, asyncWriteOp batching/deduplication, public SetMetadata() and DeleteMetadata() that enqueue ops (copying value bytes), synchronous fallback when stopped, and updates Close() to stop, close channel, wait for writer to drain, then clear caches and close underlying store.
Store metadata batch API & tracing
pkg/store/types.go, pkg/store/store.go, pkg/store/tracing.go
Adds MetadataKV type and Metadata.BatchMetadata(ctx, puts, deletes) error interface method; implements DefaultStore.BatchMetadata using backend batch operations with per-op error wrapping; adds tracedStore.BatchMetadata that wraps calls in an OpenTelemetry span with attributes for put/delete counts and error recording.
Tests and mocks
pkg/store/cached_store_test.go, pkg/store/tracing_test.go, test/mocks/store.go
Adds tests for async metadata propagation, coalescing semantics, shutdown/drain flush, and post-Close behavior; extends mock store with BatchMetadata support and expectation/callback wiring.
Changelog
CHANGELOG.md
Adds an Unreleased entry documenting async metadata write optimization (PR #3298).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant CachedStore
    participant Channel as Async Channel
    participant Writer as Writer Goroutine
    participant BaseStore

    Client->>CachedStore: SetMetadata(key, value)
    alt Channel has room & not stopped
        CachedStore->>Channel: Enqueue asyncWriteOp
        Channel-->>Writer: Deliver op
        Writer->>BaseStore: BatchMetadata(context.Background(), puts, deletes)
        BaseStore-->>Writer: Success / Error
    else Channel full or stopped
        CachedStore->>BaseStore: SetMetadata (synchronous fallback)
        BaseStore-->>CachedStore: Result
    end
    CachedStore-->>Client: Return
Loading
sequenceDiagram
    participant Client
    participant CachedStore
    participant Channel as Write Channel
    participant Writer as Writer Goroutine
    participant BaseStore

    Client->>CachedStore: Close()
    CachedStore->>CachedStore: mark stopped, stop enqueueing
    CachedStore->>Channel: close channel
    CachedStore->>Writer: wait for writer to exit
    Writer->>Writer: drain queued operations
    loop For each remaining op (batched)
        Writer->>BaseStore: BatchMetadata(context.Background(), puts, deletes)
        BaseStore-->>Writer: Result
    end
    Writer-->>CachedStore: exit
    CachedStore->>CachedStore: clear caches
    CachedStore->>BaseStore: Close()
    BaseStore-->>CachedStore: closed
    CachedStore-->>Client: Return
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐇 I nibble bits and queue them light,
a busy loop works through the night;
set, delete, then set once more—
batched and gentle through the store. 🌿

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'perf(store): save metadata async' clearly and concisely summarizes the main change—converting synchronous metadata writes to asynchronous operations for performance improvement.
Description check ✅ Passed The description provides comprehensive context including the problem (contention on shared Badger store), the solution (async metadata writes with fallback), failure-mode tolerance, and implementation details with component-level impact.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch julien/async-metadata

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 28, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedApr 29, 2026, 10:15 AM

@julienrbrt julienrbrt marked this pull request as ready for review April 28, 2026 15:23
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 28, 2026

Claude finished @julienrbrt's task in 2m 29s —— View job


Code Review: perf(store): save metadata async

  • Read CLAUDE.md / AGENTS.md for repo guidelines
  • Read changed files
  • Analyze async metadata write implementation
  • Review concurrency patterns and safety
  • Review error handling
  • Review tests
  • Post comprehensive review

Summary

The overall design is sound and the motivating problem is real: the submitter's 3–4 SetMetadata calls per height were contending on the same Badger write pipeline as the executor's critical batch commits. The async buffering approach is the right fix. Key correctness issues were caught and addressed during review (slice aliasing → valueCopy, coalescing → last map, drain loop → comma-ok). What follows is the remaining analysis.


Critical / Major

1. Potential deadlock in Close() when the write buffer is full

pkg/store/cached_store.go:282

func (cs *CachedStore) Close() error {
    cs.stopMu.Lock()   // ← needs exclusive lock
    cs.stopped = true
    close(cs.writeCh)
    cs.stopMu.Unlock()
    <-cs.done
    ...
}
func (cs *CachedStore) SetMetadata(...) error {
    cs.stopMu.RLock()          // ← holds shared lock
    defer cs.stopMu.RUnlock()
    ...
    cs.writeCh <- asyncWriteOp{...}  // ← BLOCKING if buffer full
    return nil
}

When the 8192-slot buffer is full, any goroutine blocked on cs.writeCh <- holds an RLock. Close() cannot acquire the write lock until all RLocks are released. Those RLocks are only released once the writer goroutine drains enough space — so this resolves in normal operation. However, if the writer goroutine panics or is killed while callers are blocked on the channel, Close() deadlocks forever (no <-cs.done will ever unblock). Even in the normal path, Close() stall duration is unbounded.

chatton raised this exact concern. julienrbrt confirmed the blocking send is intentional for ordering, but the liveness risk during Close() is real. A mitigation would be to add a ctx-based timeout to <-cs.done:

func (cs *CachedStore) Close() error {
    cs.stopMu.Lock()
    cs.stopped = true
    close(cs.writeCh)
    cs.stopMu.Unlock()
    
    select {
    case <-cs.done:
    case <-time.After(5 * time.Second):
        cs.logger.Warn().Msg("timed out waiting for async metadata writes to drain")
    }
    ...
}

Fix this →

2. Read-after-write staleness is invisible to callers

CachedStore.GetMetadata delegates synchronously to the underlying store (cs.Store.GetMetadata), but SetMetadata only enqueues the write. Any code that writes then immediately reads back a metadata key — even within the same goroutine — may observe the old value:

cs.SetMetadata(ctx, "da-height", bz)  // enqueued, not yet persisted
val, _ := cs.GetMetadata(ctx, "da-height")  // reads from DB, sees old value

The PR description acknowledges this in the context of pkill -9 recovery, but the stale-read problem also applies during normal operation. It would be worth auditing callers (submitter, cache) to confirm none does a read-after-write on the same key within the same control flow. Documenting this as a contract on CachedStore.SetMetadata would also help future readers.

3. context.Context cancellation is silently dropped in async writes

SetMetadata(ctx, ...) accepts a context, but the background worker calls cs.BatchMetadata(context.Background(), ...). If a caller passes a deadline-bearing context expecting the write to cancel on timeout, it won't. This isn't a bug given the design intent, but the method signature implies context-awareness that doesn't exist. Consider documenting this in the method comment, or renaming/dropping the ctx parameter in the async path.


Minor

4. Missing t.Parallel() in two tests

pkg/store/cached_store_test.go:320 and line 369TestCachedStore_Close_FlushesPendingWrites and TestCachedStore_CoalescesSameKeyOps are missing t.Parallel() while all other tests in the file use it. The TestCachedStore_Close_FlushesPendingWrites test uses t.TempDir() which is safe for parallel execution.

5. TestCachedStore_CoalescesSameKeyOps creates CachedStore with nil LRU caches

cs := &CachedStore{
    Store:   base,
    writeCh: writeCh,
    done:    done,
    logger:  zerolog.Nop(),
}

headerCache and blockDataCache are nil. Any call to GetHeader or GetBlockData in a future refactor of this test will panic. Prefer using NewCachedStore(base) and accessing internal fields only for what strictly needs bypassing, or accept the fragility as a test-internal detail.

6. Silent async errors when logger is zerolog.Nop()

The logger defaults to zerolog.Nop(), so async BatchMetadata failures (cached_store.go:154) are silently dropped. The existing WithHeaderCacheSize/WithBlockDataCacheSize option pattern is already in place — a WithLogger(logger zerolog.Logger) option following the same pattern would make errors observable without requiring large API changes. This is a minimal addition.

7. Timer resource cleanup in startWriteLoop

When the <-timer.C case fires and break collect exits the inner loop, timer.Stop() is not called. Per the Go docs this is safe because the timer has already fired and its channel has been received, so there is no resource to reclaim. This is correct. Mentioning it only so future readers don't add an unnecessary timer.Stop() on the timer-fire path.


Positive Observations

  • Value copy before enqueue (valueCopy := append([]byte(nil), value...)) correctly prevents the background goroutine from observing mutated caller memory.
  • Last-write-wins coalescing (last[o.key] = o) correctly handles Delete("k") followed by Set("k", v) in the same batch window — the Set wins.
  • Comma-ok drain loop (case op, ok := <-cs.writeCh:) correctly handles the closed-channel case in both drain paths.
  • Close() drain-before-close ensures no pending writes are dropped on graceful shutdown.
  • batchWindow timer (100µs) is a thoughtful coalescing optimization that matches the submitter's 3–4 ops-per-height burst pattern.
  • Tracing via tracedStore.BatchMetadata integrates cleanly with the existing OTel pattern.
  • Mock generation via test/mocks/store.go is correct and complete.

Overall: the implementation is solid and the critical bugs identified in earlier review rounds have been fixed. The main open concern is the Close() liveness issue under backpressure, which is worth a small mitigation. The read-after-write stale-read semantics should at minimum be documented on SetMetadata.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 28, 2026

Codecov Report

❌ Patch coverage is 63.09524% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 62.47%. Comparing base (81c1c25) to head (c24f901).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
pkg/store/tracing.go 0.00% 14 Missing ⚠️
pkg/store/store.go 33.33% 5 Missing and 5 partials ⚠️
pkg/store/cached_store.go 87.27% 5 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3298      +/-   ##
==========================================
- Coverage   62.52%   62.47%   -0.05%     
==========================================
  Files         122      122              
  Lines       13047    13131      +84     
==========================================
+ Hits         8158     8204      +46     
- Misses       4004     4034      +30     
- Partials      885      893       +8     
Flag Coverage Δ
combined 62.47% <63.09%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/store/cached_store.go (1)

91-98: ⚠️ Potential issue | 🟠 Major

Surface async metadata persistence failures via Close() or another observable path.

Enqueued metadata writes (SetMetadata, DeleteMetadata) return nil immediately. The background worker only logs failures, and the default logger is zerolog.Nop(), which silently discards errors. This means metadata persistence can fail undetected—no returned error, no emitted logs. Surface at least the first async writer error via Close() and make the logger configurable via a CachedStoreOption.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store.go` around lines 91 - 98, The background writer
currently drops async metadata errors (writes via writeCh from
SetMetadata/DeleteMetadata) and uses zerolog.Nop() by default; modify
CachedStore to accept a CachedStoreOption to set logger (replace zerolog.Nop())
and add an internal field to capture the first async write error (e.g.,
firstWriteErr protected by a mutex or a buffered error channel). Update the
background worker that processes asyncWriteOp to record the first non-nil error
into that field instead of only logging, close the worker and signal completion
as before, and change Close() to wait for the worker to finish and return the
captured error (if any). Ensure SetMetadata/DeleteMetadata still enqueue
immediately but that callers can observe persistence failures via Close() (or a
future exported accessor) and that the new option allows injecting a real logger
for immediate visibility.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/store/cached_store_test.go`:
- Around line 332-338: The test currently waits for the last write via
require.Eventually before calling cs.Close(), which prevents exercising
Close()'s flush path; change the flow to enqueue all writes (the loop that
writes keys to cs), call cs.Close() immediately (without waiting), then assert
directly against the underlying base store (e.g., the test's base store variable
or method such as base.GetMetadata / base data structure) that the queued writes
were flushed after Close(); replace the require.Eventually check against
cs.GetMetadata with a post-Close assertion on base to verify Close() drained the
queue.

In `@pkg/store/cached_store.go`:
- Around line 222-235: SetMetadata and DeleteMetadata currently enqueue by
reference and fall back to synchronous writes when writeCh is full, causing
ordering, aliasing and silent-failure issues; fix by making the enqueue blocking
(remove the non-blocking default path so older queued ops always run first),
copy the value slice into the asyncWriteOp before sending (avoid slice
aliasing), and extend asyncWriteOp with a response channel (e.g., resp chan
error) so callers block on the resp and return any error produced by the
background processor; apply the same changes to DeleteMetadata and update the
background goroutine to send the operation result on resp after executing
Store.SetMetadata/Store.DeleteMetadata to preserve ordering and surface async
errors.
- Around line 222-235: CachedStore.SetMetadata enqueues the caller's []byte
directly which can be mutated after return; make a defensive copy of value
before sending on cs.writeCh (e.g. newSlice := append([]byte(nil), value...))
and set asyncWriteOp.value to that copy so the writer goroutine sees an
immutable snapshot; update the enqueue path in SetMetadata to copy value before
creating/sending asyncWriteOp (retain the direct sync path unchanged).

---

Outside diff comments:
In `@pkg/store/cached_store.go`:
- Around line 91-98: The background writer currently drops async metadata errors
(writes via writeCh from SetMetadata/DeleteMetadata) and uses zerolog.Nop() by
default; modify CachedStore to accept a CachedStoreOption to set logger (replace
zerolog.Nop()) and add an internal field to capture the first async write error
(e.g., firstWriteErr protected by a mutex or a buffered error channel). Update
the background worker that processes asyncWriteOp to record the first non-nil
error into that field instead of only logging, close the worker and signal
completion as before, and change Close() to wait for the worker to finish and
return the captured error (if any). Ensure SetMetadata/DeleteMetadata still
enqueue immediately but that callers can observe persistence failures via
Close() (or a future exported accessor) and that the new option allows injecting
a real logger for immediate visibility.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 00d3f79c-48ff-4ac8-bc17-c89e521974e8

📥 Commits

Reviewing files that changed from the base of the PR and between 389e904 and f7ea4ad.

📒 Files selected for processing (2)
  • pkg/store/cached_store.go
  • pkg/store/cached_store_test.go

Comment thread pkg/store/cached_store_test.go Outdated
Comment thread pkg/store/cached_store.go
@julienrbrt julienrbrt marked this pull request as draft April 28, 2026 15:44
@julienrbrt julienrbrt marked this pull request as ready for review April 28, 2026 17:27
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/store/cached_store.go`:
- Around line 126-136: The conversion from ops to puts/deletes drops per-key
ordering so a Delete then Set for the same key can end up applied as delete; fix
by collapsing ops per key to the last mutation before calling BatchMetadata:
iterate the ops slice (from first to last), maintain a map[string]opInfo (or
simple marker) keyed by o.key that records the final operation and value, then
after the pass produce the puts []MetadataKV and deletes []string from that map
(for a final Set produce a put, for a final Delete produce a delete) and call
cs.Store.BatchMetadata with those; update the code around ops, MetadataKV, and
cs.Store.BatchMetadata to use this coalescing logic.
- Around line 117-123: The drain loop that reads from cs.writeCh should detect a
closed channel using the comma-ok form instead of a plain receive to avoid
receiving zero-value asyncWriteOp forever; update the select case in the loop
(the one labeled drain that currently does `case op := <-cs.writeCh:`) to `case
op, ok := <-cs.writeCh:` and if !ok break drain, otherwise append op to ops;
apply the same change to the other symmetric drain at the other location so the
goroutine can exit and the done channel can be closed on shutdown.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 59e07316-b8a3-4aed-bf6e-5de11a84ea60

📥 Commits

Reviewing files that changed from the base of the PR and between f7ea4ad and 6c53bc4.

📒 Files selected for processing (7)
  • CHANGELOG.md
  • pkg/store/cached_store.go
  • pkg/store/store.go
  • pkg/store/tracing.go
  • pkg/store/tracing_test.go
  • pkg/store/types.go
  • test/mocks/store.go
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.md

Comment thread pkg/store/cached_store.go
Comment thread pkg/store/cached_store.go Outdated
@julienrbrt julienrbrt requested a review from tac0turtle April 28, 2026 18:01
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pkg/store/cached_store_test.go (1)

320-358: ⚠️ Potential issue | 🟡 Minor

Test bypasses Close() and doesn't verify the advertised flush behavior.

The test manually sets cs.stopped = true and closes writeCh instead of calling cs.Close(). This doesn't verify that Close() actually drains pending writes—it only confirms the write loop processes ops before exiting. To properly test the contract, enqueue writes and call cs.Close() immediately, then assert that all queued writes are present in base.

,

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store_test.go` around lines 320 - 358, The test incorrectly
bypasses CachedStore.Close() by setting cs.stopped and closing writeCh directly;
update TestCachedStore_Close_FlushesPendingWrites to enqueue the writes into cs
(using cs.SetMetadata), then call cs.Close() immediately (instead of
manipulating stopMu/stopped/writeCh), and finally verify via base.GetMetadata
that all n writes were flushed; reference CachedStore.Close, cs.Close(),
cs.SetMetadata, base.GetMetadata, startWriteLoop and remove the manual
stopMu/stopped/writeCh manipulation so the test validates Close()'s flush
behavior.
🧹 Nitpick comments (2)
pkg/store/cached_store_test.go (1)

360-375: Test name is misleading.

The test is named WriteAfterClose_FallsBack but actually verifies that writes fail after close. The "fallback" path (synchronous write to underlying store) also fails because the underlying store is already closed. Consider renaming to WriteAfterClose_Errors for clarity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store_test.go` around lines 360 - 375, Rename the test
function TestCachedStore_WriteAfterClose_FallsBack to
TestCachedStore_WriteAfterClose_Errors to reflect that it asserts writes error
after Close rather than falling back; update the function declaration name and
any test references (imports, suite registrations, or calls) that refer to
TestCachedStore_WriteAfterClose_FallsBack, keeping the body intact which uses
NewCachedStore, SetMetadata, and Close to perform the assertions.
pkg/store/cached_store.go (1)

117-162: Consider adding a shutdown context for graceful termination.

The write loop uses context.Background() for BatchMetadata calls. While this is intentional for decoupled async writes, it means shutdown cannot be bounded by a deadline. If the underlying store becomes unresponsive during shutdown, <-cs.done in Close() could block indefinitely.

Consider storing a cancellable context (created with a timeout in Close()) that can be used to bound the final drain operation.

Sketch of approach
 type CachedStore struct {
 	Store
 	// ...
+	shutdownCtx    context.Context
+	cancelShutdown context.CancelFunc
 }

 func NewCachedStore(store Store, opts ...CachedStoreOption) (*CachedStore, error) {
+	ctx, cancel := context.WithCancel(context.Background())
 	cs := &CachedStore{
 		// ...
+		shutdownCtx:    ctx,
+		cancelShutdown: cancel,
 	}
 	// ...
 }

 func (cs *CachedStore) Close() error {
+	// Give pending writes a bounded time to complete
+	ctx, cancel := context.WithTimeout(cs.shutdownCtx, 5*time.Second)
+	defer cancel()
 	cs.stopMu.Lock()
 	cs.stopped = true
 	close(cs.writeCh)
 	cs.stopMu.Unlock()
-	<-cs.done
+	select {
+	case <-cs.done:
+	case <-ctx.Done():
+		cs.logger.Warn().Msg("timed out waiting for async writes to drain")
+	}
 	// ...
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store.go` around lines 117 - 162, The async write loop in
CachedStore.startWriteLoop uses context.Background() for BatchMetadata which
prevents bounding shutdown; change CachedStore to hold a cancellable shutdown
context (e.g., shutdownCtx and shutdownCancel) and update startWriteLoop to use
that context for BatchMetadata calls instead of context.Background(); in Close()
create a context with timeout, assign it to shutdownCtx (or call shutdownCancel
after closing writeCh), then attempt to drain writes using that bounded context
and ensure <-cs.done unblocks when the final BatchMetadata returns or the
timeout triggers; update references: startWriteLoop, BatchMetadata, cs.done, and
Close to coordinate with the new shutdown context and cancellation semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/store/cached_store.go`:
- Around line 256-266: The current CachedStore.SetMetadata uses a blocking send
to cs.writeCh (asyncWriteOp) which contradicts the PR description's required
backpressure fallback; change the send to a non-blocking select that attempts to
enqueue the asyncWriteOp to writeCh and, if the channel is full, synchronously
call the underlying Store.SetMetadata(ctx, key, valueCopy) instead (preserve
stopMu read lock and the stopped check, and keep copying value into valueCopy
before either enqueuing or falling back).

---

Duplicate comments:
In `@pkg/store/cached_store_test.go`:
- Around line 320-358: The test incorrectly bypasses CachedStore.Close() by
setting cs.stopped and closing writeCh directly; update
TestCachedStore_Close_FlushesPendingWrites to enqueue the writes into cs (using
cs.SetMetadata), then call cs.Close() immediately (instead of manipulating
stopMu/stopped/writeCh), and finally verify via base.GetMetadata that all n
writes were flushed; reference CachedStore.Close, cs.Close(), cs.SetMetadata,
base.GetMetadata, startWriteLoop and remove the manual stopMu/stopped/writeCh
manipulation so the test validates Close()'s flush behavior.

---

Nitpick comments:
In `@pkg/store/cached_store_test.go`:
- Around line 360-375: Rename the test function
TestCachedStore_WriteAfterClose_FallsBack to
TestCachedStore_WriteAfterClose_Errors to reflect that it asserts writes error
after Close rather than falling back; update the function declaration name and
any test references (imports, suite registrations, or calls) that refer to
TestCachedStore_WriteAfterClose_FallsBack, keeping the body intact which uses
NewCachedStore, SetMetadata, and Close to perform the assertions.

In `@pkg/store/cached_store.go`:
- Around line 117-162: The async write loop in CachedStore.startWriteLoop uses
context.Background() for BatchMetadata which prevents bounding shutdown; change
CachedStore to hold a cancellable shutdown context (e.g., shutdownCtx and
shutdownCancel) and update startWriteLoop to use that context for BatchMetadata
calls instead of context.Background(); in Close() create a context with timeout,
assign it to shutdownCtx (or call shutdownCancel after closing writeCh), then
attempt to drain writes using that bounded context and ensure <-cs.done unblocks
when the final BatchMetadata returns or the timeout triggers; update references:
startWriteLoop, BatchMetadata, cs.done, and Close to coordinate with the new
shutdown context and cancellation semantics.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b61d0e08-cf11-43ba-80f0-2aa9078917c6

📥 Commits

Reviewing files that changed from the base of the PR and between 6c53bc4 and bd84902.

📒 Files selected for processing (2)
  • pkg/store/cached_store.go
  • pkg/store/cached_store_test.go

Comment thread pkg/store/cached_store.go
Comment on lines +256 to +266
func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
cs.stopMu.RLock()
defer cs.stopMu.RUnlock()

if cs.stopped {
return cs.Store.SetMetadata(ctx, key, value)
}
valueCopy := append([]byte(nil), value...)
cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}
return nil
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Blocking send contradicts PR description's backpressure fallback.

The PR description states "When the buffer is full, operations fall back to synchronous writes (backpressure)". However, the current implementation uses a blocking send (cs.writeCh <- asyncWriteOp{...}) which will block the caller indefinitely if the 8192-slot buffer is full, rather than falling back to synchronous writes.

If the fallback behavior is intended, use a non-blocking send:

Proposed fix for fallback behavior
 func (cs *CachedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
 	cs.stopMu.RLock()
 	defer cs.stopMu.RUnlock()

 	if cs.stopped {
 		return cs.Store.SetMetadata(ctx, key, value)
 	}
 	valueCopy := append([]byte(nil), value...)
-	cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}
-	return nil
+	select {
+	case cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}:
+		return nil
+	default:
+		return cs.Store.SetMetadata(ctx, key, value)
+	}
 }

If blocking is intentional (for guaranteed ordering), update the PR description to reflect this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store.go` around lines 256 - 266, The current
CachedStore.SetMetadata uses a blocking send to cs.writeCh (asyncWriteOp) which
contradicts the PR description's required backpressure fallback; change the send
to a non-blocking select that attempts to enqueue the asyncWriteOp to writeCh
and, if the channel is full, synchronously call the underlying
Store.SetMetadata(ctx, key, valueCopy) instead (preserve stopMu read lock and
the stopped check, and keep copying value into valueCopy before either enqueuing
or falling back).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is intentional.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julienrbrt, understood — the blocking send is intentional to ensure ordered and guaranteed delivery of metadata write operations to the background goroutine. I'll note this for future reviews.

( ˘•ω•˘ )


✏️ Learnings added
Learnt from: julienrbrt
Repo: evstack/ev-node PR: 3298
File: pkg/store/cached_store.go:256-266
Timestamp: 2026-04-28T21:47:13.754Z
Learning: In `evstack/ev-node`, `CachedStore.SetMetadata` and `CachedStore.DeleteMetadata` in `pkg/store/cached_store.go` use a **blocking** send to `cs.writeCh` intentionally. Despite the PR description mentioning a "backpressure fallback to synchronous writes", the blocking behavior is a deliberate design choice for guaranteed ordering. Do not flag the blocking channel send as a bug or suggest replacing it with a non-blocking select fallback.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: alpe
Repo: evstack/ev-node PR: 3131
File: block/internal/syncing/syncer_backoff_test.go:241-245
Timestamp: 2026-03-06T09:40:36.029Z
Learning: In evstack/ev-node, the module declares go 1.25.6. Since Go 1.22, loop variables are per-iteration by default, so loop variable capture is not a concern. Do not flag or fix loop variable capture in this codebase for any Go files; this rule applies to all Go files in the repository, not just the specific test file.

Copy link
Copy Markdown
Contributor

@chatton chatton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, might be worth while to add test case which tries to send to a full channel and then call Close, I think it will stall.

After that LGTM

Comment thread pkg/store/cached_store.go
Comment on lines +124 to +130
collect:
for {
select {
case op, ok := <-cs.writeCh:
if !ok {
timer.Stop()
break collect
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace this with a separate function to avoid using a goto?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there isn't a goto actually. it is just labeled to exit the inner loop and not returning. do you really hate it? personally i don't really like many tiny functions.

Comment thread pkg/store/cached_store.go
return cs.Store.SetMetadata(ctx, key, value)
}
valueCopy := append([]byte(nil), value...)
cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do something like this

      select {
      case cs.writeCh <- asyncWriteOp{key: key, value: valueCopy}:
          return nil
      default:
          return cs.Store.SetMetadata(ctx, key, valueCopy)
      }

If the channel is full, this will stall and it will not be possible call Close() which aquires the same lock

Same applies for DeleteMetadata

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, see #3298 (comment), this is to keep ordering.
If this is full (which should be unlikely), the submitter / pruner will take longer, while sabing the pending keys to the db.

Ordering is important to not get stale values.

Comment thread pkg/store/cached_store.go
return cs, nil
}

func (cs *CachedStore) startWriteLoop() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the performance gained through this change? also what is the performance difference with non automatic fsync on the db, we should be able to write to the db fast, it keeps it in cache then it flushes every n blocks. Are we already doing that?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Badger already has non automatic fsync, so that would be the same.
The issue is even to multiple concurrent writes (submitter + executor), writing to WAL requires a short lock.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
pkg/store/cached_store_test.go (1)

320-350: Add t.Parallel() for consistency.

This test uses t.TempDir() which creates a unique directory per test, so it's safe to run in parallel. All other tests in this file use t.Parallel().

Proposed fix
 func TestCachedStore_Close_FlushesPendingWrites(t *testing.T) {
+	t.Parallel()
 	ctx := context.Background()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store_test.go` around lines 320 - 350, In
TestCachedStore_Close_FlushesPendingWrites add t.Parallel() as the first call in
the test so it runs concurrently like the other tests; update the
TestCachedStore_Close_FlushesPendingWrites function to invoke t.Parallel()
(before using t.TempDir()/creating stores) to allow safe parallel execution.
pkg/store/cached_store.go (1)

97-104: Consider adding a WithLogger option to surface async batch failures.

The logger defaults to zerolog.Nop(), so async batch write failures in startWriteLoop (line 155-158) are silently dropped. While the PR notes that metadata is recoverable on restart, having visibility into failures could aid debugging.

Example option
// WithLogger sets the logger for async write errors.
func WithLogger(logger zerolog.Logger) CachedStoreOption {
	return func(cs *CachedStore) error {
		cs.logger = logger
		return nil
	}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/cached_store.go` around lines 97 - 104, The CachedStore currently
defaults logger to zerolog.Nop(), so errors from async batch writes in
startWriteLoop are swallowed; add a CachedStoreOption type and implement
WithLogger(logger zerolog.Logger) that sets the CachedStore.logger field, apply
option functions when constructing the CachedStore in NewCachedStore (or
equivalent) so callers can inject a real logger, and ensure startWriteLoop uses
cs.logger to log async write failures; keep zerolog.Nop() as the default when no
option is provided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/store/cached_store_test.go`:
- Around line 320-350: In TestCachedStore_Close_FlushesPendingWrites add
t.Parallel() as the first call in the test so it runs concurrently like the
other tests; update the TestCachedStore_Close_FlushesPendingWrites function to
invoke t.Parallel() (before using t.TempDir()/creating stores) to allow safe
parallel execution.

In `@pkg/store/cached_store.go`:
- Around line 97-104: The CachedStore currently defaults logger to
zerolog.Nop(), so errors from async batch writes in startWriteLoop are
swallowed; add a CachedStoreOption type and implement WithLogger(logger
zerolog.Logger) that sets the CachedStore.logger field, apply option functions
when constructing the CachedStore in NewCachedStore (or equivalent) so callers
can inject a real logger, and ensure startWriteLoop uses cs.logger to log async
write failures; keep zerolog.Nop() as the default when no option is provided.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2369a42f-48d2-495d-ab84-df359728d35b

📥 Commits

Reviewing files that changed from the base of the PR and between bd84902 and c24f901.

📒 Files selected for processing (3)
  • CHANGELOG.md
  • pkg/store/cached_store.go
  • pkg/store/cached_store_test.go
✅ Files skipped from review due to trivial changes (1)
  • CHANGELOG.md

@julienrbrt julienrbrt added this pull request to the merge queue Apr 29, 2026
Merged via the queue into main with commit fdc79ad Apr 29, 2026
36 of 37 checks passed
@julienrbrt julienrbrt deleted the julien/async-metadata branch April 29, 2026 11:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants